home *** CD-ROM | disk | FTP | other *** search
- #!jython
- #Python Serial Port Extension for Win32, Linux, BSD, Jython
- #module for serial IO for Jython and JavaComm
- #see __init__.py
- #
- #(C) 2002-2003 Chris Liechti <cliechti@gmx.net>
- # this is distributed under a free software license, see license.txt
-
- import javax.comm
- from serialutil import *
-
- VERSION = "$Revision: 1.10 $".split()[1] #extract CVS version
-
-
- def device(portnumber):
- """Turn a port number into a device name"""
- enum = javax.comm.CommPortIdentifier.getPortIdentifiers()
- ports = []
- while enum.hasMoreElements():
- el = enum.nextElement()
- if el.getPortType() == javax.comm.CommPortIdentifier.PORT_SERIAL:
- ports.append(el)
- return ports[portnumber].getName()
-
- class Serial(SerialBase):
- """Serial port class, implemented with javax.comm and thus usable with
- jython and the appropriate java extension."""
-
- def open(self):
- """Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- if type(self._port) == type(''): #strings are taken directly
- portId = javax.comm.CommPortIdentifier.getPortIdentifier(self._port)
- else:
- portId = javax.comm.CommPortIdentifier.getPortIdentifier(device(self._port)) #numbers are transformed to a comportid obj
- try:
- self.sPort = portId.open("python serial module", 10)
- except Exception, msg:
- self.sPort = None
- raise SerialException("Could not open port: %s" % msg)
- self._reconfigurePort()
- self._instream = self.sPort.getInputStream()
- self._outstream = self.sPort.getOutputStream()
- self._isOpen = True
-
- def _reconfigurePort(self):
- """Set commuication parameters on opened port."""
- if not self.sPort:
- raise SerialException("Can only operate on a valid port handle")
-
- self.sPort.enableReceiveTimeout(30)
- if self._bytesize == FIVEBITS:
- jdatabits = javax.comm.SerialPort.DATABITS_5
- elif self._bytesize == SIXBITS:
- jdatabits = javax.comm.SerialPort.DATABITS_6
- elif self._bytesize == SEVENBITS:
- jdatabits = javax.comm.SerialPort.DATABITS_7
- elif self._bytesize == EIGHTBITS:
- jdatabits = javax.comm.SerialPort.DATABITS_8
- else:
- raise ValueError("unsupported bytesize: %r" % self._bytesize)
-
- if self._stopbits == STOPBITS_ONE:
- jstopbits = javax.comm.SerialPort.STOPBITS_1
- elif stopbits == STOPBITS_ONE_HALVE:
- self._jstopbits = javax.comm.SerialPort.STOPBITS_1_5
- elif self._stopbits == STOPBITS_TWO:
- jstopbits = javax.comm.SerialPort.STOPBITS_2
- else:
- raise ValueError("unsupported number of stopbits: %r" % self._stopbits)
-
- if self._parity == PARITY_NONE:
- jparity = javax.comm.SerialPort.PARITY_NONE
- elif self._parity == PARITY_EVEN:
- jparity = javax.comm.SerialPort.PARITY_EVEN
- elif self._parity == PARITY_ODD:
- jparity = javax.comm.SerialPort.PARITY_ODD
- #~ elif self._parity == PARITY_MARK:
- #~ jparity = javax.comm.SerialPort.PARITY_MARK
- #~ elif self._parity == PARITY_SPACE:
- #~ jparity = javax.comm.SerialPort.PARITY_SPACE
- else:
- raise ValueError("unsupported parity type: %r" % self._parity)
-
- jflowin = jflowout = 0
- if self._rtscts:
- jflowin |= javax.comm.SerialPort.FLOWCONTROL_RTSCTS_IN
- jflowout |= javax.comm.SerialPort.FLOWCONTROL_RTSCTS_OUT
- if self._xonxoff:
- jflowin |= javax.comm.SerialPort.FLOWCONTROL_XONXOFF_IN
- jflowout |= javax.comm.SerialPort.FLOWCONTROL_XONXOFF_OUT
-
- self.sPort.setSerialPortParams(baudrate, jdatabits, jstopbits, jparity)
- self.sPort.setFlowControlMode(jflowin | jflowout)
-
- if self._timeout >= 0:
- self.sPort.enableReceiveTimeout(self._timeout*1000)
- else:
- self.sPort.disableReceiveTimeout()
-
- def close(self):
- """Close port"""
- if self._isOpen:
- if self.sPort:
- self._instream.close()
- self._outstream.close()
- self.sPort.close()
- self.sPort = None
- self._isOpen = False
-
- def makeDeviceName(self, port):
- return device(port)
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def inWaiting(self):
- """Return the number of characters currently in the input buffer."""
- if not self.sPort: raise portNotOpenError
- return self._instream.available()
-
- def read(self, size=1):
- """Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read."""
- if not self.sPort: raise portNotOpenError
- read = ''
- if size > 0:
- while len(read) < size:
- x = self._instream.read()
- if x == -1:
- if self.timeout >= 0:
- break
- else:
- read = read + chr(x)
- return read
-
- def write(self, data):
- """Output the given string over the serial port."""
- if not self.sPort: raise portNotOpenError
- self._outstream.write(data)
-
- def flushInput(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if not self.sPort: raise portNotOpenError
- self._instream.skip(self._instream.available())
-
- def flushOutput(self):
- """Clear output buffer, aborting the current output and
- discarding all that is in the buffer."""
- if not self.sPort: raise portNotOpenError
- self._outstream.flush()
-
- def sendBreak(self, duration=0.25):
- """Send break condition."""
- if not self.sPort: raise portNotOpenError
- self.sPort.sendBreak(duration*1000.0)
-
- def setRTS(self, level=1):
- """Set terminal status line: Request To Send"""
- if not self.sPort: raise portNotOpenError
- self.sPort.setRTS(level)
-
- def setDTR(self, level=1):
- """Set terminal status line: Data Terminal Ready"""
- if not self.sPort: raise portNotOpenError
- self.sPort.setDTR(level)
-
- def getCTS(self):
- """Read terminal status line: Clear To Send"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isCTS()
-
- def getDSR(self):
- """Read terminal status line: Data Set Ready"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isDSR()
-
- def getRI(self):
- """Read terminal status line: Ring Indicator"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isRI()
-
- def getCD(self):
- """Read terminal status line: Carrier Detect"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isCD()
-
-
-
- if __name__ == '__main__':
- s = Serial(0,
- baudrate=19200, #baudrate
- bytesize=EIGHTBITS, #number of databits
- parity=PARITY_EVEN, #enable parity checking
- stopbits=STOPBITS_ONE, #number of stopbits
- timeout=3, #set a timeout value, None for waiting forever
- xonxoff=0, #enable software flow control
- rtscts=0, #enable RTS/CTS flow control
- )
- s.setRTS(1)
- s.setDTR(1)
- s.flushInput()
- s.flushOutput()
- s.write('hello')
- print repr(s.read(5))
- print s.inWaiting()
- del s
-
-
-